import tensorflow as tf
tf.enable_eager_execution()
import functools
import matplotlib.pyplot as plt
import numpy as np
import pdb
# Download the class repository
! git clone https://github.com/aamini/introtodeeplearning_labs.git > /dev/null 2>&1
% cd introtodeeplearning_labs
! git pull
% cd ..
!cp -r introtodeeplearning_labs/lab1 lab1
!cp -r /content/lab1/util.py util.py
# Import the necessary class-specific utility files for this lab
import introtodeeplearning_labs as util
# Get the training data: both images from CelebA and ImageNet
path_to_training_data = tf.keras.utils.get_file('train_face.h5', 'https://www.dropbox.com/s/l5iqduhe0gwxumq/train_face.h5?dl=1')
# Instantiate a TrainingDatasetLoader using the downloaded dataset
loader = util.TrainingDatasetLoader(path_to_training_data)
number_of_training_examples = loader.get_train_size()
(images, labels) = loader.get_batch(100)
#@title Change the sliders to look at positive and negative training examples! { run: "auto" }
face_images = images[np.where(labels==1)[0]]
not_face_images = images[np.where(labels==0)[0]]
idx_face = 39 #@param {type:"slider", min:0, max:50, step:1}
idx_not_face = 39 #@param {type:"slider", min:0, max:50, step:1}
plt.figure(figsize=(4,2))
plt.subplot(1, 2, 1)
plt.imshow(face_images[idx_face])
plt.title("Face")
plt.grid(False)
plt.subplot(1, 2, 2)
plt.imshow(not_face_images[idx_not_face])
plt.title("Not Face")
plt.grid(False)
#@title Change the sliders to look at positive and negative training examples! { run: "auto" }
ppb = util.PPBFaceEvaluator(skip=4) # create the dataset handler
gender = "female" #@param ["male", "female"]
skin_color = "darker" #@param ["lighter", "darker"]
img = ppb.get_sample_faces_from_demographic(gender, skin_color)
plt.imshow(img)
plt.grid(False)
n_outputs = 1 # number of outputs (i.e., face or not face)
n_filters = 12 # base number of convolutional filters
'''Function to define a standard CNN model'''
def make_standard_classifier():
Conv2D = functools.partial(tf.keras.layers.Conv2D, padding='same', activation='relu')
BatchNormalization = tf.keras.layers.BatchNormalization
Flatten = tf.keras.layers.Flatten
Dense = functools.partial(tf.keras.layers.Dense, activation='relu')
model = tf.keras.Sequential([
# TODO: define a convolutional layer with n_filters 5x5 filters and 2x2 stride
Conv2D(n_filters, 5, strides=(2,2)),
BatchNormalization(),
# TODO: define a convolutional layer with 2*n_filters 5x5 filters and 2x2 stride
Conv2D(2 * n_filters, 5, strides=(2,2)),
BatchNormalization(),
# TODO: define a convolutional layer with 4*n_filters 3x3 filters and 2x2 stride
Conv2D(4 * n_filters, 3, strides=(2,2)),
BatchNormalization(),
# TODO: define a convolutional layer with 6*n_filters 3x3 filters and 1x1 stride
Conv2D(6 * n_filters, 3, strides=(1,1)),
BatchNormalization(),
Flatten(),
Dense(1, activation=None),
tf.keras.layers.Dropout(0.5)
])
return model
standard_classifier = make_standard_classifier()
batch_size = 36
num_epochs = 10 # keep small to run faster
learning_rate = 1e-3
optimizer = tf.train.AdamOptimizer(learning_rate=learning_rate) # define our optimizer
loss_history = util.LossHistory(smoothing_factor=0.99) # to record the evolution of the loss
plotter = util.PeriodicPlotter(sec=2, scale='semilogy')
# The training loop!
for epoch in range(num_epochs):
custom_msg = util.custom_progress_text("Epoch: %(epoch).0f Loss: %(loss)2.2f")
bar = util.create_progress_bar(custom_msg)
for idx in bar(range(loader.get_train_size()//batch_size)):
# First grab a batch of training data and convert the input images to tensors
x, y = loader.get_batch(batch_size)
x = tf.convert_to_tensor(x, dtype=tf.float32)
y = tf.convert_to_tensor(y, dtype=tf.float32)
# GradientTape to record differentiation operations
with tf.GradientTape() as tape:
logits = standard_classifier(x) # feed the images into the model
loss_value = tf.nn.sigmoid_cross_entropy_with_logits(labels=y, logits=logits) # compute the loss
custom_msg.update_mapping(epoch=epoch, loss=loss_value.numpy().mean())
# Backpropagation
grads = tape.gradient(loss_value, standard_classifier.variables)
optimizer.apply_gradients(zip(grads, standard_classifier.variables), global_step=tf.train.get_or_create_global_step())
loss_history.append(loss_value.numpy().mean())
plotter.plot(loss_history.get())
# Evaluate on a subset of CelebA+Imagenet
(batch_x, batch_y) = loader.get_batch(5000)
y_pred_standard = tf.round(tf.nn.sigmoid(standard_classifier.predict(batch_x)))
acc_standard = tf.reduce_mean(tf.cast(tf.equal(batch_y, y_pred_standard), tf.float32))
print "Standard CNN accuracy on (potentially biased) training set: {:.4f}".format(acc_standard.numpy())
# Evaluate on PPB dataset (takes ~3 minutes)
standard_cnn_accuracy = []
for skin_color in ['lighter', 'darker']:
for gender in ['male', 'female']:
standard_cnn_accuracy.append( ppb.evaluate([standard_classifier], gender, skin_color, from_logit=True)[0] )
print
print "{} {}: {}".format(gender, skin_color, standard_cnn_accuracy[-1])
plt.bar(range(4), standard_cnn_accuracy)
plt.xticks(range(4), ('LM', 'LF', 'DM', 'DF'))
plt.ylim(np.min(standard_cnn_accuracy)-0.1,np.max(standard_cnn_accuracy)+0.1)
plt.ylabel('Accuracy')
We can notice a bias since for lighter skin tone, the accuracy is considerably higher. There might also be a slight bias for females over males.
# Function to calculate VAE loss given an input x, reconstructed output x_pred,
# encoded means mu, encoded log of standard deviation logsigma, and weight parameter for the latent loss
def vae_loss_function(x, x_pred, mu, logsigma, kl_weight=0.0005):
'''TODO: Define the latent loss'''
latent_loss = 0.5 * tf.reduce_sum(mu**2 - logsigma + tf.exp(logsigma) - 1 )
'''TODO: Define the reconstruction loss. Hint: you'll need to use tf.reduce_mean'''
reconstruction_loss = tf.reduce_mean((x - x_pred) ** 2)
'''TODO: Define the VAE loss'''
vae_loss = kl_weight * latent_loss + reconstruction_loss
return vae_loss
"""Reparameterization trick by sampling from an isotropic unit Gaussian.
# Arguments
args (tensor): mean and log of standard deviation of latent distribution (Q(z|X))
# Returns
z (tensor): sampled latent vector
"""
def sampling(args):
z_mean, z_logsigma = args
batch = z_mean.shape[0]
dim = z_mean.shape[1]
# by default, random_normal has mean=0 and std=1.0
epsilon = tf.random_normal(tf.shape(z_mean))
'''TODO: Define the reparameterization computation!'''
z = z_mean + tf.exp(0.5 * z_logsigma) * epsilon
return z
# Loss function for DB-VAE
def debiasing_loss_function(x, x_pred, y, y_logit, mu, logsigma):
'''TODO: call the relevant function to obtain VAE loss'''
vae_loss = vae_loss_function(x, x_pred, mu, logsigma, kl_weight=0.0005)
'''TODO: define the classification loss'''
classification_loss = tf.nn.sigmoid_cross_entropy_with_logits(labels=y, logits=y_logit)
# Use the training data labels to create variable face_mask
face_mask = tf.cast(tf.equal(y, 1), tf.float32)
'''TODO: define the DB-VAE total loss! Hint: think about the dimensionality of your output.'''
total_loss = tf.reduce_mean(classification_loss + face_mask * vae_loss)
return total_loss, classification_loss
latent_dim = 100
'''Define the encoder network for the DB-VAE'''
def make_face_encoder_network():
Conv2D = functools.partial(tf.keras.layers.Conv2D, padding='same', activation='relu')
BatchNormalization = tf.keras.layers.BatchNormalization
Flatten = tf.keras.layers.Flatten
Dense = functools.partial(tf.keras.layers.Dense, activation='relu')
inputs = tf.keras.layers.Input(shape=(64,64,3))
hidden = Conv2D(filters=1*n_filters, kernel_size=[5,5], strides=[2,2])(inputs)
hidden = BatchNormalization()(hidden)
hidden = Conv2D(filters=2*n_filters, kernel_size=[5,5], strides=[2,2])(hidden)
hidden = BatchNormalization()(hidden)
hidden = Conv2D(filters=4*n_filters, kernel_size=[3,3], strides=[2,2])(hidden)
hidden = BatchNormalization()(hidden)
hidden = Conv2D(filters=6*n_filters, kernel_size=[3,3], strides=[1,1])(hidden)
hidden = BatchNormalization()(hidden)
hidden = Flatten(name='flatten')(hidden)
'''Encoder outputs:
y_logit: supervised class prediction
z_mean: means in the latent space
z_logsigma: standard deviations in the latent space'''
y_logit = Dense(1, activation=None, name='y_logit')(hidden)
z_mean = Dense(latent_dim, name='z_mean')(hidden)
z_logsigma = Dense(latent_dim, name='z_logsigma')(hidden)
# use reparameterization trick to sample from the latent space
z = tf.keras.layers.Lambda(sampling, output_shape=(latent_dim,))([z_mean, z_logsigma])
# define the outputs that the encoder model should return
outputs = [y_logit, z_mean, z_logsigma, z]
# finalize the encoder model
encoder = tf.keras.Model(inputs=inputs, outputs=outputs, name='encoder')
# get the shape of the final convolutional output (right before the flatten)
flatten_layer_idx = encoder.layers.index(encoder.get_layer('flatten'))
pre_flatten_shape = encoder.layers[flatten_layer_idx-1].get_output_at(0).shape[1:]
return encoder, inputs, outputs, pre_flatten_shape
'''Define the decoder network for the DB-VAE'''
def make_face_decoder_network(pre_flatten_shape):
Conv2DTranspose = functools.partial(tf.keras.layers.Conv2DTranspose, padding='same', activation='relu')
BatchNormalization = tf.keras.layers.BatchNormalization
Flatten = tf.keras.layers.Flatten
Dense = functools.partial(tf.keras.layers.Dense, activation='relu')
latent_inputs = tf.keras.layers.Input(shape=(latent_dim,))
hidden = Dense(tf.reduce_prod(pre_flatten_shape))(latent_inputs)
hidden = tf.keras.layers.Reshape(pre_flatten_shape)(hidden)
# series of deconvolutional layers with batch normalization
hidden = Conv2DTranspose(filters=4*n_filters, kernel_size=[3,3], strides=[1,1])(hidden)
hidden = BatchNormalization()(hidden)
hidden = Conv2DTranspose(filters=2*n_filters, kernel_size=[3,3], strides=[2,2])(hidden)
hidden = BatchNormalization()(hidden)
hidden = Conv2DTranspose(filters=1*n_filters, kernel_size=[5,5], strides=[2,2])(hidden)
hidden = BatchNormalization()(hidden)
x_hat = Conv2DTranspose(filters=3, kernel_size=[5,5], strides=[2,2])(hidden)
# instantiate decoder model
decoder = tf.keras.Model(inputs=latent_inputs, outputs=x_hat, name='decoder')
return decoder
'''TODO: create the encoder and decoder networks'''
encoder, inputs, outputs, pre_flatten_shape = make_face_encoder_network()
decoder = make_face_decoder_network(pre_flatten_shape)
# initialize the models
encoder_output = encoder(inputs)
y_logit, z_mean, z_logsigma, z = encoder_output
reconstructed_inputs = decoder(z)
vae = tf.keras.Model(inputs, reconstructed_inputs)
util.display_model(encoder)
# Function to return the means for an input image batch
def get_latent_mu(images, encoder, batch_size=1024):
N = images.shape[0]
mu = np.zeros((N, latent_dim))
for start_ind in xrange(0, N, batch_size):
end_ind = min(start_ind+batch_size, N+1)
batch = images[start_ind:end_ind]
batch = tf.convert_to_tensor(batch, dtype=tf.float32)/255.
_, batch_mu, _, _ = encoder(batch)
mu[start_ind:end_ind] = batch_mu
return mu
'''Function that recomputes the sampling probabilities for images within a batch
based on how they distribute across the '''
def get_training_sample_probabilities(images, encoder, bins=10, smoothing_fac=0.0):
print "Recomputing the sampling probabilities"
mu = get_latent_mu(images, encoder)
# sampling probabilities for the images
training_sample_p = np.zeros(mu.shape[0])
# consider the distribution for each latent variable
for i in range(latent_dim):
latent_distribution = mu[:,i]
# generate a histogram of the latent distribution
hist_density, bin_edges = np.histogram(latent_distribution, density=True, bins=bins)
# find which latent bin every data sample falls in
# https://docs.scipy.org/doc/numpy-1.13.0/reference/generated/numpy.digitize.html
bin_edges[0] = -float('inf')
bin_edges[-1] = float('inf')
'''TODO: call the digitize function to find which bins in the latent distribution
every data sample falls in to'''
bin_index = np.digitize(latent_distribution, bin_edges)
# smooth the density function [Eq. #]
hist_smoothed_density = hist_density + smoothing_fac
hist_smoothed_density = hist_smoothed_density / np.sum(hist_smoothed_density)
'''TODO: invert the density function to compute the sampling probability!
HINT: think carefully about the indexing of the bins! What is the length of bin_edges?'''
p = 1 / hist_smoothed_density[bin_index - 1]
# normalize all probabilities
p = p / np.sum(p)
# update sampling probabilities
training_sample_p = np.maximum(p, training_sample_p)
# final normalization
training_sample_p /= np.sum(training_sample_p)
return training_sample_p
loss_history = []
batch_size = 36
num_epochs = 10 # keep small to run faster
learning_rate = 1e-3
optimizer = tf.train.AdamOptimizer(learning_rate=learning_rate)
enable_debiasing = True
all_faces = loader.get_all_train_faces() # parameter from data loader
for epoch in range(num_epochs):
# progress message and bar
custom_msg = util.custom_progress_text("Epoch: %(epoch).0f Iter: %(idx).0f Class Loss: %(class_loss)2.2f Loss: %(loss)2.2f")
bar = util.create_progress_bar(custom_msg)
p_faces = None
if enable_debiasing:
# Recompute data sampling proabilities if debiasing is enabled
'''TODO: write the function call to recompute the sampling probabilities
when debiasing is enabled'''
p_faces = get_training_sample_probabilities(all_faces, encoder, bins=10, smoothing_fac=0.0)
for idx in bar(range(loader.get_train_size()//batch_size)):
# load a batch of data
(x, y) = loader.get_batch(batch_size, p_pos=p_faces)
x = tf.convert_to_tensor(x, dtype=tf.float32)
y = tf.convert_to_tensor(y, dtype=tf.float32)
# define GradientTape for automatic differentiation
with tf.GradientTape() as tape:
y_logit, mu, logsigma, z = encoder(x)
x_hat = decoder(z)
'''TODO: call the relevant loss function to compute the loss'''
loss, class_loss = debiasing_loss_function(x, x_hat, y, y_logit, mu, logsigma)
'''TODO: use the GradientTape.gradient method to compute the gradients'''
grads = tape.gradient(loss, vae.variables)
# apply gradients to variables
optimizer.apply_gradients(zip(grads, vae.variables),
global_step=tf.train.get_or_create_global_step())
# track the losses
class_loss_value = class_loss.numpy().mean()
loss_value = loss.numpy().mean()
loss_history.append((class_loss_value, loss_value))
custom_msg.update_mapping(epoch=epoch, idx=idx, loss=loss_value, class_loss=class_loss_value)
# plot the progress every 100 steps
if idx%100 == 0:
util.plot_sample(x,y,vae)
# Evaluate on PPB dataset (takes ~4 minutes)
accuracy_debiased = []
for skin_color in ['lighter', 'darker']:
for gender in ['male', 'female']:
accuracy_debiased.append( ppb.evaluate([encoder], gender, skin_color, output_idx=0, from_logit=True)[0] )
print
print "{} {}: {}".format(gender, skin_color, accuracy_debiased[-1])
bar_width = 0.3
plt.bar(np.arange(4), standard_cnn_accuracy, width=bar_width)
plt.bar(np.arange(4)+bar_width, accuracy_debiased, width=bar_width)
plt.legend(('Standard Classifier','Debiased Classifier (DB-VAE)'))
plt.xticks(np.arange(4), ('LM', 'LF', 'DM', 'DF'))
plt.ylim(np.min([standard_cnn_accuracy,accuracy_debiased])-0.1,1)
plt.ylabel('Accuracy')
We can notice how much closer the different genders and skin tone have become, and how our accuracy increased for all.
def run_smooth_factor(factor_smooth):
encoder, inputs, outputs, pre_flatten_shape = make_face_encoder_network()
decoder = make_face_decoder_network(pre_flatten_shape)
# initialize the models
encoder_output = encoder(inputs)
y_logit, z_mean, z_logsigma, z = encoder_output
reconstructed_inputs = decoder(z)
vae = tf.keras.Model(inputs, reconstructed_inputs)
loss_history = []
batch_size = 36
num_epochs = 10 # keep small to run faster
learning_rate = 1e-3
optimizer = tf.train.AdamOptimizer(learning_rate=learning_rate)
enable_debiasing = True
all_faces = loader.get_all_train_faces() # parameter from data loader
for epoch in range(num_epochs):
# progress message and bar
custom_msg = util.custom_progress_text("Epoch: %(epoch).0f Iter: %(idx).0f Class Loss: %(class_loss)2.2f Loss: %(loss)2.2f")
bar = util.create_progress_bar(custom_msg)
p_faces = None
if enable_debiasing:
# Recompute data sampling proabilities if debiasing is enabled
'''TODO: write the function call to recompute the sampling probabilities
when debiasing is enabled'''
p_faces = get_training_sample_probabilities(all_faces, encoder, bins=10, smoothing_fac=factor_smooth)
for idx in bar(range(loader.get_train_size()//batch_size)):
# load a batch of data
(x, y) = loader.get_batch(batch_size, p_pos=p_faces)
x = tf.convert_to_tensor(x, dtype=tf.float32)
y = tf.convert_to_tensor(y, dtype=tf.float32)
# define GradientTape for automatic differentiation
with tf.GradientTape() as tape:
y_logit, mu, logsigma, z = encoder(x)
x_hat = decoder(z)
'''TODO: call the relevant loss function to compute the loss'''
loss, class_loss = debiasing_loss_function(x, x_hat, y, y_logit, mu, logsigma)
'''TODO: use the GradientTape.gradient method to compute the gradients'''
grads = tape.gradient(loss, vae.variables)
# apply gradients to variables
optimizer.apply_gradients(zip(grads, vae.variables),
global_step=tf.train.get_or_create_global_step())
# track the losses
class_loss_value = class_loss.numpy().mean()
loss_value = loss.numpy().mean()
loss_history.append((class_loss_value, loss_value))
custom_msg.update_mapping(epoch=epoch, idx=idx, loss=loss_value, class_loss=class_loss_value)
# plot the progress every 100 steps
if idx%100 == 0:
util.plot_sample(x,y,vae)
# Evaluate on PPB dataset (takes ~4 minutes)
accuracy_debiased = []
for skin_color in ['lighter', 'darker']:
for gender in ['male', 'female']:
accuracy_debiased.append( ppb.evaluate([encoder], gender, skin_color, output_idx=0, from_logit=True)[0] )
print
print "{} {}: {}".format(gender, skin_color, accuracy_debiased[-1])
bar_width = 0.3
plt.bar(np.arange(4), standard_cnn_accuracy, width=bar_width)
plt.bar(np.arange(4)+bar_width, accuracy_debiased, width=bar_width)
plt.legend(('Standard Classifier','Debiased Classifier (DB-VAE)'))
plt.xticks(np.arange(4), ('LM', 'LF', 'DM', 'DF'))
plt.ylim(np.min([standard_cnn_accuracy,accuracy_debiased])-0.1,1)
plt.ylabel('Accuracy')
run_smooth_factor(0.2)
run_smooth_factor(0.5)
run_smooth_factor(1)
run_smooth_factor(2)